package com.hl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;

/**
 * @describe: 步骤：
 * HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。
 * 1.安装hadoop 启动
 * 2.安装zookeeper 启动
 * 3.安装hbase
 * 启动： ./hbase-daemon.sh start master
 * ./hbase-daemon.sh start regionserver
 * 客户端：./hbase shell
 */
public class HbaseDemo {
    private static Configuration conf;
    private static HBaseAdmin admin = null;
    private static Connection connection = null;

    /**
     * 创建连接
     */
    @BeforeClass
    public static void before() {
        //使用 HBaseConfiguration 的单例方法实例化
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "https://sh-bp1j0i732sui5f7a9-hbase-serverless.hbase.rds.aliyuncs.com:443");
        //conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.client.username", "LTAI4GL8ivnVu5yedauKPC2d");
        conf.set("hbase.client.password", "ClTpkfHPHEPwFkxtu6a7KLH9SCJttI");
        conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());
        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = (HBaseAdmin) connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭连接
     */
    @AfterClass
    public static void after() throws IOException {
        connection.close();
    }

    /**
     * 创建命名空间
     * hbase shell:
     * 创建命名空间：    create_namespace 'test_namespace'
     * 命名空间列表： list_namespace
     * 查看单个命名空间的描述： describe_namespace 'test_namespace'
     */
    public static void createNameSpace(String nameSpace) {
        NamespaceDescriptor namespaceDescriptor = null;
        try {
            namespaceDescriptor = admin.getNamespaceDescriptor(nameSpace);
            System.out.println("命名空间已经存在" + namespaceDescriptor.getName());
        } catch (NamespaceNotFoundException e) {
            //若发生特定的异常，即找不到命名空间，则创建命名空间
            namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
            try {
                admin.createNamespace(namespaceDescriptor);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void createNameSpaceTest() {
        createNameSpace("test_nameSpace");
    }


    /**
     * 创建表
     * create 'test_namespace:test_table', 'test_f','test_f1'
     */
    public static void createTable(String tableName, String... columnFamily) throws IOException {
        boolean isExist = admin.tableExists(TableName.valueOf(tableName));
        //判断表是否存在
        if (isExist) {
            System.out.println("表" + tableName + "已存在");
        } else {
            //创建表属性对象,表名需要转字节
            HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
            for (String cf : columnFamily) {
                descriptor.addFamily(new HColumnDescriptor(cf));
            }
            admin.createTable(descriptor);
            System.out.println("表" + tableName + "创建成功！");
        }
    }

    @Test
    public void testCreateTable() throws IOException {
        HbaseDemo.createTable("test_table", "test_f");
    }

    /**
     * 删除表
     *  1.禁用表： disable 'test_namespace:test_table'
     *  2.删除表：drop 'test_namespace:test_table'
     */
    public static void dropTable(String tableName) throws IOException {
        boolean isExist = admin.tableExists(TableName.valueOf(tableName));
        if (isExist) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println("表" + tableName + "删除成功！");
        } else {
            System.out.println("表" + tableName + "不存在！");
        }
    }

    @Test
    public void testDropTable() throws IOException {
        dropTable("test_table");
    }

    /**
     * 插入数据
     *      put 'test_namespace:test_table', '1001', 'test_f:name', 'zhangsan'
     *      put 'test_namespace:test_table', '1001', 'test_f:age', '19'
     *      put 'test_namespace:test_table', '1002', 'test_f:name', 'lisi'
     *      put 'test_namespace:test_table', '1002', 'test_f:age', '20'
     */
    public static void insertData(String tableName, String rowkey, String cf, String cn, String value) throws IOException {
        TableName tablename = TableName.valueOf(tableName);
        Put put = new Put((rowkey).getBytes());
        put.addColumn(cf.getBytes(), cn.getBytes(), value.getBytes());
        Table table = connection.getTable(tablename);
        table.put(put);
    }

    @Test
    public void testInsertData() throws IOException {
        insertData("test_table", "1001", "test_f", "name", "张三");
        insertData("test_table", "1001", "test_f", "age", "13");
        insertData("test_table", "1002", "test_f", "name", "李四");
        insertData("test_table", "1002", "test_f", "age", "15");
    }

    /**
     * 获取原始数据
     *  scan 'test_namespace:test_table'
     */
    public static void getNoDealData(String tableName) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            ResultScanner resutScanner = table.getScanner(scan);
            for (Result result : resutScanner) {
                System.out.println("scan:  " + result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void getNoDealDataTest() {
        getNoDealData("test_table");
    }


    /**
     * 根据rowKey进行查询
     *      get 'test_namespace:test_table','1001'
     */
    public void getDataByRowKey(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(rowKey.getBytes());
        StringBuilder sb = new StringBuilder();
        sb.append("rowkey:").append(rowKey).append("\t");
        //先判断是否有此条数据
        if (!get.isCheckExistenceOnly()) {
            Result result = table.get(get);
            for (Cell cell : result.rawCells()) {
                String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                if (colName.equals("name")) {
                    sb.append("name:").append(value).append("\t");
                }
                if (colName.equals("age")) {
                    sb.append("age:").append(value).append("\t");
                }
            }
        }
        System.out.println("查询结果：" + sb.toString());
    }

    @Test
    public void getDataByRowKeyTest() throws IOException {
        getDataByRowKey("test_table", "1001");
    }


    /**
     * 查询指定单元格的内容，获取指定表下，的指定rowkey 指定列族的，指定列的值
     *  get 'test_namespace:test_table','1001','test_f:name'
     */
    public void getCellData(String tableName, String rowKey, String family, String... col) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            String result = null;
            Get get = new Get(rowKey.getBytes());
            if (!get.isCheckExistenceOnly()) {
                get.addColumn(Bytes.toBytes(family), Bytes.toBytes(col[0]));
                get.addColumn(Bytes.toBytes(family), Bytes.toBytes(col[1]));
                Result res = table.get(get);
                byte[] resByte = res.getValue(Bytes.toBytes(family), Bytes.toBytes(col[0]));
                byte[] resByte1 = res.getValue(Bytes.toBytes(family), Bytes.toBytes(col[1]));
                System.out.println(col[0] + ":" + Bytes.toString(resByte) + "\t" + col[1] + ":" + Bytes.toString(resByte1));
            } else {
                System.out.println("查询结果不存在");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void getCellDataTest() {
        getCellData("test_table", "1001", "test_f", "age", "name");
    }

    /**
     * 查询指定表名中所有的数据
     */
    public void getAllData(String tableName) throws IOException {
        Table table = null;
        table = connection.getTable(TableName.valueOf(tableName));
        ResultScanner results = table.getScanner(new Scan());
        for (Result result : results) {
            String id = new String(result.getRow());
            System.out.println("用户名:" + new String(result.getRow()));
            StringBuilder sb = new StringBuilder();
            for (Cell cell : result.rawCells()) {
                String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                sb.append("rowkey:").append(row).append("\t");
                if (colName.equals("name")) {
                    sb.append("value:").append(value).append("\t");
                }
                if (colName.equals("age")) {
                    sb.append("age:").append(value).append("\t");
                }
            }
            System.out.println(sb.toString());
        }
    }

    @Test
    public void getAllDataTest() throws IOException {
        getAllData("test_table");
    }

    /**
     * 删除指定cell数据
     * 删除列族： delete 'test_namespace:test_table', '1002','test_f'
     * 删除指定列：delete 'test_namespace:test_table', '1002','test_f:name'
     */
    public void deleteByRowKey(String tableName, String rowKey, String cf, String cname) throws IOException {
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        //删除指定列
        delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cname));
        Table table = connection.getTable(TableName.valueOf(tableName));
        table.delete(delete);
    }

    @Test
    public void deleteByRowKeyTest() throws IOException {
        deleteByRowKey("test_table", "1002", "test_f", "name");
    }

}
